﻿// ------------------------------------------------------------------------
//  Copyright 2025 The Dapr Authors
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//      http://www.apache.org/licenses/LICENSE-2.0
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
//  ------------------------------------------------------------------------

#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Google.Protobuf;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously

namespace Dapr.Client.Crypto;

/// <summary>
/// Provides the implementation to encrypt a stream of plaintext data with the Dapr runtime. 
/// </summary>
internal sealed class EncryptionStreamProcessor : IDisposable
{
    private bool disposed;
    private readonly Channel<ReadOnlyMemory<byte>> outputChannel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>();
    
    /// <summary>
    /// Surfaces any exceptions encountered while asynchronously processing the inbound and outbound streams. 
    /// </summary>
    internal event EventHandler<Exception>? OnException;

    /// <summary>
    /// Sends the provided bytes in chunks to the sidecar for the encryption operation.
    /// </summary>
    /// <param name="inputStream">The stream containing the bytes to encrypt.</param>
    /// <param name="call">The call to make to the sidecar to process the encryption operation.</param>
    /// <param name="options">The encryption options.</param>
    /// <param name="streamingBlockSizeInBytes">The size, in bytes, of the streaming blocks.</param>
    /// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
    public async Task ProcessStreamAsync(
        Stream inputStream,
        AsyncDuplexStreamingCall<Autogenerated.EncryptRequest, Autogenerated.EncryptResponse> call,
        Autogenerated.EncryptRequestOptions options,
        int streamingBlockSizeInBytes,
        CancellationToken cancellationToken)
    {
        //Read from the input stream and write to the gRPC call
        _ = Task.Run(async () =>
        {
            try
            {
                await using var bufferedStream = new BufferedStream(inputStream, streamingBlockSizeInBytes);
                var buffer = new byte[streamingBlockSizeInBytes];
                int bytesRead;
                ulong sequenceNumber = 0;

                while ((bytesRead = await bufferedStream.ReadAsync(buffer, cancellationToken)) > 0)
                {
                    var request = new Autogenerated.EncryptRequest
                    {
                        Payload = new Autogenerated.StreamPayload
                        {
                            Data = ByteString.CopyFrom(buffer, 0, bytesRead), Seq = sequenceNumber
                        }
                    };

                    //Only include the options in the first message
                    if (sequenceNumber == 0)
                    {
                        request.Options = options;
                    }

                    await call.RequestStream.WriteAsync(request, cancellationToken);

                    //Increment the sequence number
                    sequenceNumber++;
                }
            }
            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
            {
                // Expected cancellation exception
            }
            catch (Exception ex)
            {
                OnException?.Invoke(this, ex);
            }
            finally
            {
                await call.RequestStream.CompleteAsync();
            }
        }, cancellationToken);
        
        //Start reading from the gRPC call and writing to the output channel
        _ = Task.Run(async () =>
        {
            try
            {
                await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
                {
                    await outputChannel.Writer.WriteAsync(response.Payload.Data.Memory, cancellationToken);
                }
            }
            catch (Exception ex)
            {
                OnException?.Invoke(this, ex);
            }
            finally
            {
                outputChannel.Writer.Complete();
            }
        }, cancellationToken);
    }

    /// <summary>
    /// Retrieves the processed bytes from the operation from the sidecar and
    /// returns as an enumerable stream.
    /// </summary>
    public async IAsyncEnumerable<ReadOnlyMemory<byte>> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
        {
            yield return data;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                outputChannel.Writer.TryComplete();
            }

            disposed = true;
        }
    }
}
